package org.xhy.oap.web;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaConsumer {

    @Autowired
    private RestHighLevelClient client;


    @KafkaListener(topics = "trace_kafka", groupId = "group")
    public void listen(String message) throws IOException {
        IndexRequest request = new IndexRequest("trace_02"); // Elasticsearch 索引
        request.source(message, XContentType.JSON);        // 插入的内容是 JSON 格式
        IndexResponse response = client.index(request, RequestOptions.DEFAULT); // 执行插入操作
        System.out.println("Document inserted with ID: " + response.getId());
        System.out.println("Received Message: " + message);
    }
}
